Cache
What is Python SDK Cache?
The Python SDK Cache is an in-memory data store built on Redis Cache. You can think of it as a tiny database that runs in the memory of your app and allows you to read and write data very quickly.
Stream and Scheduled apps are meant to be stateless, the apps need some mechanism to share the data between invokes. Cache is an in-memory low latency storage for such data. Cache provides an interface to save, load and do other operations with data. It is passed as an argument to the lambda handler. Task apps do not get a cache parameter as they are not meant to share the data between invokes.
Cache uses a dict-like database, so the data is stored as key:value pairs.
Cache Data Types
Cache can store the following value types, however, they value types must be cast to strings before saving to Cache.
- string
- integer
- float
- bytes
The cache stores the values in binary format. The data is decoded while loading, but the cache can’t assume the type of the value. That’s why the returned type is string.
Cache Memory
Cache has a max memory limit of 100MB. It is recommended to stay below 100kb.
Cache Key Expiry
By default, keys expire in 60 days.
Cache Key Format
/well/<asset_id>/stream/<app_stream_id>/<app_key>/<app_connection_id>
Where:
- asset_id: The Corva unique identifier of the asset.
- app_stream_id: The Corva unique identifier of a stream of an asset. Backend apps must be assigned to a stream in order to invoke.
- app_key: The Corva name of the app found in the manifest.json file.
- app_connection_id: A Corva unique identifier for connecting an app to a stream. Each app has its own unique app_connection_id to the stream.
Example of logging the cache key:
# 1. Import required functionality.
from corva import Api, Cache, Logger, StreamTimeEvent, stream
# 2. - Decorate your function using @stream. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@stream
def lambda_handler(event: StreamTimeEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: StreamTimeEvent and start using Api, Cache and Logger functionalities. You can obtain key values directly from metadata in the stream app event without making any additional API requests.
# You have access to asset_id, company_id, and real-time data records from event. For this example, we are only needing records.
records = event.records
# Each element of records has a timestamp. You can declare variables for timestamps.
timestamp = records[0].timestamp
#Utilize the Cache functionality to set a key and a value.
cache.set(key="timestamp", value=str(timestamp))
#Utilize the Cache functionlity to get the key value.
cache.get(key="timestamp") == str(timestamp)
#Utilize the Logger functionality to log the cache key and the associated attributes.
#Note: This may change after any sdk update, breaking the app. So please remove once you have reviewed the key attributes.
Logger.info(cache.cache_repo.hash_name)
Example of app logs when logging the cache key:
2023-02-09 11:29:10 (UTC -06:00)
START RequestId: 3d6dfa8a-bb7f-4412-b142-6884a9cfcab3 Version: 13
#In this log, please note the following cache key attributes of sample/well/38369059/stream/95233/sample.cache_example_app/1755656, where:
# 1. asset_id = 38369059
# 2. app_stream_id = 95233
# 3. app_key = sample.cache_example_app
# 4. app_connection_id = 1755656
2023-02-09 11:29:10 (UTC -06:00)
2023-02-09T17:29:10.320Z 3d6dfa8a-bb7f-4412-b142-6884a9cfcab3 INFO ASSET=38369059 AC=1755656 | sample/well/38369059/stream/95233/sample.cache_example_app/1755656
2023-02-09 11:29:10 (UTC -06:00)
END RequestId: 3d6dfa8a-bb7f-4412-b142-6884a9cfcab3
2023-02-09 11:29:10 (UTC -06:00)
REPORT RequestId: 3d6dfa8a-bb7f-4412-b142-6884a9cfcab3 Duration: 104.13 ms Billed Duration: 105 ms Memory Size: 128 MB Max Memory Used: 64 MB
When to use Python SDK Cache?
The cache functionality is needed when an app needs to share a small amount of data between invokes e.g. timestamps in a Stream or Scheduled app.
A typical example of Cache usage is as follows:
- Store some data during app invoke #1.
- Retrieve and use the data during app invoke #2.
When not to use Python SDK Cache?
- When building a Task App. Task apps do not get a cache functionality as they are not meant to share data between invokes.
- When storing large data objects or amounts of data greater than 100MB.
What are some considerations when using Python SDK Cache?
App Processing Time
If you use cache and the next invocation of your app depends on the previous invocation, then your stream or scheduled app should finish before another invocation starts.
You need to choose the interval and timeout carefully taking into account how many api requests you send, what logic you implement and how long it would take.
Removing an App from a Stream
If you remove an app from a stream and then add it back, it will not be using the same cache since cache is scoped by app connection id.
When you deploy a new version to the app the app connection id on the stream remains the same and the cache also remains the same.
How to use Python SDK Cache?
The following examples will demonstrate how to use the Python SDK Cache.
How to use Python SDK Cache to Set and Get
The method cache.set() stores data in cache. It can take the following parameters:
- key: key which will contain the data (string)
- value: data to be saved
- mapping: dict of key:data pairs to be saved. It allows to store multiple values at once
- expiry: time in seconds for when data will be deleted from cache
The method cache.set() returns the number of inserted elements.
1. How to use Python SDK Cache to Set and Get a string
The following example function shows how to set a string for a rig classification in cache and get the value.
# 1. Import required functionality.
from corva import Api, Cache, Logger, ScheduledDataTimeEvent, scheduled, secrets
# 2. - Decorate your function using @scheduled. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@scheduled
def rig_classification_check(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: ScheduledDataTimeEvent and start using Api, Cache and Logger functionalities.
# The scheduled app can declare the following attributes from the ScheduledDataTimeEvent: company_id: The company identifier; asset_id: The asset identifier; start_time: The start time of interval; end_time: The end time of interval
asset_id = event.asset_id
# 4. Initialize returns
message = ""
# 5. Check that the asset has an offshore rig classification utilizing Corva Python SDK Cache to get the key
cache_is_floating = cache.get(key="is_floating")
if cache_is_floating == "0":
is_floating = False
elif cache_is_floating == "1":
is_floating = True
else: # if cache is None it means it hasn't initialized so we need to fetch if it is floating or not
# 6. Utilize the attributes from the ScheduledDataTimeEvent to make an API request to the /v2/assets endpoint.
# Set up headers for authorization and call secrets Key. In this example, we are using Secrets as it is required when querying the Corva Platform API. Please reference Corva Python SDK API documentation.
headers = {
"Authorization": f"{secrets['MY_API_KEY']}"
}
# Set up the API request parameters
params = {
"fields": "all"
}
# Set up the Corva Platform API request
asset_info = api.get(f"https://api.corva.ai/v2/assets/{asset_id}/", headers=headers, params=params).json()
if asset_info:
settings = asset_info["data"]["attributes"]["settings"]
rig_classification = settings.get("rig_classification", None)
is_floating = True if rig_classification == "floating" else False
else:
logger.info("Offshore Spud Rule: Failed to get asset info.")
return {"message": message}
if not is_floating: # if not a floating rig the rule logic should not proceed.
# 7. Dependent on the results of the log, utilize the Corva Python SDK Cache to set the string value
cache.set(key="is_floating", value="0")
message = "Not a floating rig"
return {"message": message}
else:
cache.set(key="is_floating", value="1")
return {"message": message}
2. How to use Python SDK Cache to Set and Get an integer
The following example shows how to how to set an integer, timestamp, in cache and get the value.
# 1. Import required functionality.
from corva import Api, Cache, Logger, ScheduledDataTimeEvent, scheduled
import statistics
# 2. - Decorate your function using @scheduled. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@scheduled
def lambda_handler(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: ScheduledDataTimeEvent and start using Api, Cache and Logger functionalities.
# The scheduled app can declare the following attributes from the ScheduledDataTimeEvent: company_id: The company identifier; asset_id: The asset identifier; start_time: The start time of interval; end_time: The end time of interval
asset_id = event.asset_id
company_id = event.company_id
start_time = event.start_time
end_time = event.end_time
# 4. Utilize the attributes from the ScheduledDataTimeEvent to make an API request to corva#wits or any desired time type dataset.
# You have to fetch the realtime drilling data for the asset based on start and end time of the event.
# start_time and end_time are inclusive so the query is structured accordingly to avoid processing duplicate data
# We are only querying for rop field since that is the only field we need. It is nested under data. We are using the SDK convenience method api.get_dataset. See API Section for more information on convenience method.
records = api.get_dataset(
provider="corva",
dataset= "wits",
query={
'asset_id': asset_id,
'timestamp': {
'$gte': start_time,
'$lte': end_time,
}
},
sort={'timestamp': 1},
limit=500,
fields="data.rop"
)
record_count = len(records)
# Utilize the Logger functionality. The default log level is Logger.info. To use a different log level, the log level must be specified in the manifest.json file in the "settings.environment": {"LOG_LEVEL": "DEBUG"}. See the Logger documentation for more information.
Logger.debug(f"{asset_id=} {company_id=}")
Logger.debug(f"{start_time=} {end_time=} {record_count=}")
# 5. Implementing some calculations
# Computing mean rop value from the list of realtime wits records
rop = statistics.mean(record.get("data", {}).get("rop", 0) for record in records)
# Utililize the Cache functionality to get a set key value.
# Getting last exported timestamp from Cache
last_exported_timestamp = int(cache.get(key='last_exported_timestamp') or 0)
# Making sure we are not processing duplicate data
if end_time <= last_exported_timestamp:
Logger.debug(f"Already processed data until {last_exported_timestamp=}")
return None
# 6. This is how to set up a body of a POST request to store the mean rop data and the start_time and end_time of the interval from the event.
output = {
"timestamp": record.timestamp,
"asset_id": asset_id,
"company_id": company_id,
"provider": "big-data-energy",
"collection": "example-scheduled-data-time-app",
"data": {
"rop": rop,
"start_time": start_time,
"end_time": end_time
},
"version": 1
}
# Utilize the Logger functionality.
Logger.debug(f"{asset_id=} {company_id=}")
Logger.debug(f"{start_time=} {end_time=} {record_count=}")
Logger.debug(f"{output=}")
# 7. Save the newly calculated data in a custom dataset
# Utilize the Api functionality. The data=outputs needs to be an an array because Corva's data is saved as an array of objects. Objects being records. See the Api documentation for more information.
api.post(
f"api/v1/data/big-data-energy/example-scheduled-data-time-app/", data=[outputs],
).raise_for_status()
# Utililize the Cache functionality to set a key value. This example is setting the last timestamp of the output to Cache
cache.set(key='last_exported_timestamp', value=outputs[-1].get("timestamp"))
return output
3. How to use Python SDK Cache to Set and Get a float
The following function shows how to set a float for a rig classification in cache and get the value.
# 1. Import required functionality.
from corva import Api, Cache, Logger, ScheduledDataTimeEvent, scheduled
# 2. - Decorate your function using @scheduled. Use the the existing lambda_handler function or define your own function. It must receive three argumets: event, api and cache. The arguments serve as building blocks for your app.
@scheduled
def outer_diameter_check(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
# 3. Here is where you can declare your variables from the argument event: ScheduledDataTimeEvent and start using Api, Cache and Logger functionalities.
# The scheduled app can declare the following attributes from the ScheduledDataTimeEvent: company_id: The company identifier; asset_id: The asset identifier; start_time: The start time of interval; end_time: The end time of interval
asset_id = event.asset_id
# 4. Initialize returns
message = ""
# 5. Check that the data.casing dataset has a 9.625 inch outer diameter production casing utilizing Corva Python SDK Cache to get the key
cache_is_production_casing = cache.get(key="is_production_casing")
if cache_is_production_casing == 13.375
is_production_casing = False
elif cache_is_production_casing == 9.625:
is_production_casing = True
else: # if cache is None it means it hasn't initialized so we need to fetch if it is production_casing or not
# 6. Utilize the attributes from the ScheduledDataTimeEvent to make an API request to the data.casing dataset.
casing_info = api.get_dataset(
provider="corva",
dataset= "data.casing",
query={
'asset_id': asset_id
},
sort={'timestamp': -1},
limit=1,
fields="data.components.outer_diameter"
)
if casing_info:
data = asset_info["data"]
outer_diameter = data.get("outer_diameter", None)
is_production_casing = True if outer_diameter == 9.625 else False
else:
logger.info("Production Casing Rule: Failed to get production casing info.")
return {"message": message}
if not is_production_casing: # if not production_casing the rule logic should not proceed.
# 7. Dependent on the results of the log, utilize the Corva Python SDK Cache to set the float value
cache.set(key="is_production_casing", value=13.375)
message = "Not production_casing"
return {"message": message}
else:
cache.set(key="is_production_casing", value=str(9.625))
return {"message": message}
4. How to use Python SDK Cache to Set and Get a byte
cache.set(key='key', value=b'')
assert cache.get(key='key') == ''
- Cache can store bytes values.
5. How to use Python SDK Cache to Set and Get a dict
import json
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
cache.set(key='json', value=json.dumps({'int': 0, 'str': 'text'}))
assert isinstance(cache.get('json'), str)
assert json.loads(cache.get('json')) == {'int': 0, 'str': 'text'}
- Store multiple values at once using mapping parameter.
- Load all data from cache.
6. How to use Python SDK Cache to Delete
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
cache.set(key='str', value='text')
assert cache.get(key='str') == 'text'
cache.delete(key='str')
assert cache.get(key='str') is None
- Store some data.
- Delete specific key.
- Deleted key is not present in cache.
- Delete all keys.
- Cache is empty.
7. How to use Python SDK Cache to Set ttl
By default Cache sets an expiry to 60 days.
import time
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
cache.set(key='key', value='value', ttl=1)
assert cache.get('key') == 'value'
time.sleep(1)
assert cache.get('key') is None
- Store the value and set an expiry to 60 seconds using expiry parameter. expiry parameter also supports datetime.timedelta objects
- Get the remaining time of key expiry in seconds.
- Get the remaining time of key expiry in milliseconds.
- Verify, that cache contains some data.
- Wait for 60 seconds for data to expire.
- Verify, that cache is empty.
8. How to use Python SDK Cache with Bulk Methods
8.1 How to use Python SDK Cache with Get Many, Get All and Set Many
Cache provides some bulk methods which make it easy to work with multiple keys at once.
import time
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
assert cache.get_all() == {}
cache.set_many(
data=[
('key', 'value_1'),
('key_with_custom_expiry', 'value_2', 1),
]
)
assert cache.get_many(
keys=[
'key',
'non-existent-key',
]
) == {'key': 'value_1', 'non-existent-key': None}
assert cache.get_all() == {
'key': 'value_1',
'key_with_custom_expiry': 'value_2',
}
time.sleep(1)
assert cache.get_all() == {'key': 'value_1'}
- Get all the data from the hash. It is empty as we have not stored anything yet.
- Store multiple key-value pairs at once.
- You can set custom key expiry in seconds by providing additional tuple element.
- Get multiple keys at once.
- If you request a non-existent key it will be assigned a value of None.
- Get all the data from the hash.
- Wait for key with custom expiry to expire.
- The expired key is not present anymore.
8.2 How to use Python SDK Cache with Delete Many and Delete All
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
assert cache.get_all() == {}
cache.set_many(data=[('k1', 'v1'), ('k2', 'v2'), ('k3', 'v3'), ('k4', 'v4')])
cache.delete_many(keys=['k1', 'k2'])
assert cache.get_all() == {'k3': 'v3', 'k4': 'v4'}
cache.delete_all()
assert cache.get_all() == {}
- Cache is empty as we have not stored anything yet.
- Store some data.
- Delete multiple keys at once.
- Deleted keys are non-existent anymore.
- Delete all the data.
- Cache is empty.